08. Arm Mover: The Code
Arm Mover: The Code
arm_mover
#!/usr/bin/env python
import math
import rospy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from simple_arm.srv import *
def at_goal(pos_j1, goal_j1, pos_j2, goal_j2):
tolerance = .05
result = abs(pos_j1 - goal_j1) <= abs(tolerance)
result = result and abs(pos_j2 - goal_j2) <= abs(tolerance)
return result
def clamp_at_boundaries(requested_j1, requested_j2):
clamped_j1 = requested_j1
clamped_j2 = requested_j2
min_j1 = rospy.get_param('~min_joint_1_angle', 0)
max_j1 = rospy.get_param('~max_joint_1_angle', 2*math.pi)
min_j2 = rospy.get_param('~min_joint_2_angle', 0)
max_j2 = rospy.get_param('~max_joint_2_angle', 2*math.pi)
if not min_j1 <= requested_j1 <= max_j1:
clamped_j1 = min(max(requested_j1, min_j1), max_j1)
rospy.logwarn('j1 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j1, max_j1, clamped_j1)
if not min_j2 <= requested_j2 <= max_j2:
clamped_j2 = min(max(requested_j2, min_j2), max_j2)
rospy.logwarn('j2 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j2, max_j2, clamped_j2)
return clamped_j1, clamped_j2
def move_arm(pos_j1, pos_j2):
time_elapsed = rospy.Time.now()
j1_publisher.publish(pos_j1)
j2_publisher.publish(pos_j2)
while True:
joint_state = rospy.wait_for_message('/simple_arm/joint_states', JointState)
if at_goal(joint_state.position[0], pos_j1, joint_state.position[1], pos_j2):
time_elapsed = joint_state.header.stamp - time_elapsed
break
return time_elapsed
def handle_safe_move_request(req):
rospy.loginfo('GoToPositionRequest Received - j1:%s, j2:%s',
req.joint_1, req.joint_2)
clamp_j1, clamp_j2 = clamp_at_boundaries(req.joint_1, req.joint_2)
time_elapsed = move_arm(clamp_j1, clamp_j2)
return GoToPositionResponse(time_elapsed)
def mover_service():
rospy.init_node('arm_mover')
service = rospy.Service('~safe_move', GoToPosition, handle_safe_move_request)
rospy.spin()
if __name__ == '__main__':
j1_publisher = rospy.Publisher('/simple_arm/joint_1_position_controller/command',
Float64, queue_size=10)
j2_publisher = rospy.Publisher('/simple_arm/joint_2_position_controller/command',
Float64, queue_size=10)
try:
mover_service()
except rospy.ROSInterruptException:
pass
L3 Arm Mover The Code
The code: explained
#!/usr/bin/env python
import math
import rospy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from simple_arm.srv import *
The imported modules for
arm_mover
are the same as
simple_arm
, with the exception of two new imports. Namely, the
JointState
message, and the
simple_arm.srv
module.
JointState
messages are published to the
/simple_arm/joint_states
topic, and are used for monitoring the position of the arm.
The
simple_arm
package, and the
srv
module are automatically generated by catkin as part of the build process.
def at_goal(pos_j1, goal_j1, pos_j2, goal_j2):
tolerance = .05
result = abs(pos_j1 - goal_j1) <= abs(tolerance)
result = result and abs(pos_j2 - goal_j2) <= abs(tolerance)
return result
This function returns
True
if the joint positions are close to the goals. When taking measurements from sensors in the real world, there will always be some amount of noise. The same is true of the joint positions reported by the gazebo simulator. If both joint positions are within .05 radians of the goal,
True
is returned.
def clamp_at_boundaries(requested_j1, requested_j2):
clamped_j1 = requested_j1
clamped_j2 = requested_j2
clamp_at_boundaries()
is responsible for enforcing the minimum and maximum joint angles for each joint. If the joint angles passed in are outside of the operable range, they will be “clamped” to the nearest allowable value.
min_j1 = rospy.get_param('~min_joint_1_angle', 0)
max_j1 = rospy.get_param('~max_joint_1_angle', 2*math.pi)
min_j2 = rospy.get_param('~min_joint_2_angle', 0)
max_j2 = rospy.get_param('~max_joint_2_angle', 2*math.pi)
The minimum and maximum joint angles are retrieved from the parameter server each time
clamp_at_boundaries()
is called. The “~” is the private namespace qualifier, and indicates that the parameter we wish to get is within this node’s
private namespace
/arm_mover/
(e.g.
~min_joint_1_angle
resolves to
/arm_mover/min_joint_1_angle
). The second parameter is the default value to be returned, in the case that
rospy.get_param()
was unable to get the parameter from the param server.
if not min_j1 <= requested_j1 <= max_j1:
clamped_j1 = min(max(requested_j1, min_j1), max_j1)
rospy.logwarn('j1 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j1, max_j1, clamped_j1)
if not min_j2 <= requested_j2 <= max_j2:
clamped_j2 = min(max(requested_j2, min_j2), max_j2)
rospy.logwarn('j2 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j2, max_j2, clamped_j2)
return clamped_j1, clamped_j2
The rest of this function simply clamps the joint angle if necessary. Warning messages are logged if the requested joint angles are out of bounds.
def move_arm(pos_j1, pos_j2):
time_elapsed = rospy.Time.now()
j1_publisher.publish(pos_j1)
j2_publisher.publish(pos_j2)
while True:
joint_state = rospy.wait_for_message('/simple_arm/joint_states', JointState)
if at_goal(joint_state.position[0], pos_j1, joint_state.position[1], pos_j2):
time_elapsed = joint_state.header.stamp - time_elapsed
break
return time_elapsed
move_arm()
commands the arm to move, returning the amount of time that elapsed while the arm was moving.
Note:
Within the function we are using the
rospy.wait_for_message()
call to receive
JointState
messages from the
/simple_arm/joint_states
topic. This is blocking function call, meaning that it will not return until a message has been received on the
/simple_arm/joint_states
topic.
In general, you should not use
wait_for_message()
. We simply use it here for the sake of clarity, and because move_arm is being called from the
handle_safe_move_request()
function, which demands that the response message is passed back as a return parameter. More discussion on this below.
def handle_safe_move_request(req):
rospy.loginfo('GoToPositionRequest Received - j1:%s, j2:%s',
req.joint_1, req.joint_2)
clamp_j1, clamp_j2 = clamp_at_boundaries(req.joint_1, req.joint_2)
time_elapsed = move_arm(clamp_j1, clamp_j2)
return GoToPositionResponse(time_elapsed)
This is the service handler function. When a service client sends a
GoToPosition
request message to the
safe_move
service, this function is called. The function parameter req is of type
GoToPositionRequest
. The service response is of type
GoToPositionResponse
.
This is the service handler function, it is called whenever a new service request is received. The response to the service request is returned from the function.
Note:
move_arm()
is blocking, and will not return until the arm has finished its movement. Incoming messages cannot be processed, and no other useful work can be done in the python script while the arm is performing it’s movement command. While this poses no real problem for this example, it is a practice that should generally be avoided. One great way to avoid blocking the thread of execution would be to use
Action
. Here’s some
informative documentation
describing when it’s best to use a Topic versus a Service, versus an Action.
def mover_service():
rospy.init_node('arm_mover')
service = rospy.Service('~safe_move', GoToPosition, handle_safe_move_request)
rospy.spin()
Here the node is initialized with the name “arm_mover”, and the
GoToPosition
service is created with the name “safe_move”. As mentioned previously, the “~” qualifier identifies that safe_move is meant to belong to this node’s private namespace. The resulting service name will be
/arm_mover/safe_move
. The third parameter to the
rospy.Service()
call is the function that should be called when a service request is received. Lastly,
rospy.spin()
simply blocks until a shutdown request is received by the node. Failure to include this line would result in
mover_service()
returning, and the script completing execution.
if __name__ == '__main__':
j1_publisher = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
j2_publisher = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
try:
mover_service()
except rospy.ROSInterruptException:
pass
This section of code is similar, to that of
simple_mover()
.
Next steps
Now that you've written the
arm_mover
node, the next step is to launch it, and then test it out by interacting with the service via the command line!